package com.shaonaiyi.hadoop.filetype.avro;

import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import java.io.IOException;

/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 17:15
 * @Description 编码实现写Avro文件到HDFS
 */
public class MRAvroFileWriter {

    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {

        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);

        //2 设置job的相关属性
//        job.setOutputKeyClass(NullWritable.class);
//        job.setOutputValueClass(Text.class);
//        job.setOutputFormatClass(TextOutputFormat.class);

        //job.setOutputKeyClass(AvroKey.class);
        //job.setOutputValueClass(Person.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroJob.setOutputKeySchema(job, Person.SCHEMA$);

        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"));

        //FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);

        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();

        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);

        //8 获取writer写数据，写完关闭writer
        RecordWriter<AvroKey, Person> writer = format.getRecordWriter(hadoopAttemptContext);

//        writer.write(null, new Text("shao"));
//        writer.write(null, new Text("nai"));
//        writer.write(null, new Text("yi"));
//        writer.write(null, new Text("bigdata-man"));

        Person person = new Person();
        person.setName("jeffy");
        person.setAge(20);
        person.setFavoriteNumber(10);


        person.setFavoriteColor("red");
        writer.write(new AvroKey(person), null);

        writer.close(hadoopAttemptContext);

        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);

    }

}
